---
title: How to write interactive workflows
sidebarTitle: Write interactive workflows
---


Flows can pause or suspend execution and automatically resume when they receive type-checked input in Prefect's UI.
Flows can also send and receive type-checked input at any time while running—without pausing or suspending.

This guide explains how to use these features to build _interactive workflows_.


## Pause or suspend a flow until it receives input

You can pause or suspend a flow until it receives input from a user in Prefect's UI. This is useful when you need to ask for
additional information or feedback before resuming a flow. These workflows are often called
[human-in-the-loop](https://hai.stanford.edu/news/humans-loop-design-interactive-ai-systems) (HITL) systems.

<Note>
**Human-in-the-loop interactivity**
Approval workflows that pause to ask a human to confirm whether a workflow should continue are very common in the business world.
Certain types of [machine learning training](https://link.springer.com/article/10.1007/s10462-022-10246-w) and artificial intelligence
workflows benefit from incorporating HITL design.
</Note>

### Wait for input

To receive input while paused or suspended use the `wait_for_input` parameter in the `pause_flow_run` or `suspend_flow_run` functions.
This parameter accepts one of the following:

- A built-in type like `int` or `str`, or a built-in collection like `List[int]`
- A `pydantic.BaseModel` subclass
- A subclass of `prefect.input.RunInput`

<Tip>
When to use a `RunModel` or `BaseModel` instead of a built-in type"
There are a few reasons to use a `RunModel` or `BaseModel`. The first is that when you let Prefect automatically create one of these
classes for your input type, the field that users see in Prefect's UI when they click "Resume" on a flow run is named `value` and
has no help text to suggest what the field is. If you create a `RunInput` or `BaseModel`, you can change details like the field name,
help text, and default value, and users see those reflected in the "Resume" form.
</Tip>

The simplest way to pause or suspend and wait for input is to pass a built-in type:

```python
from prefect import flow
from prefect.flow_runs import pause_flow_run
from prefect.logging import get_run_logger

@flow
def greet_user():
    logger = get_run_logger()

    user = pause_flow_run(wait_for_input=str)

    logger.info(f"Hello, {user}!")
```

In this example, the flow run pauses until a user clicks the Resume button in the Prefect UI, enters a name, and submits the form.

<Note>
**Types can you pass for `wait_for_input`**

When you pass a built-in type such as `int` as an argument for the `wait_for_input` parameter to `pause_flow_run` or `suspend_flow_run`,
Prefect automatically creates a Pydantic model containing one field annotated with the type you specified. This means you can use
[any type annotation that Pydantic accepts for model fields](https://docs.pydantic.dev/1.10/usage/types/) with these functions.
</Note>

Instead of a built-in type, you can pass in a `pydantic.BaseModel` class. This is useful if you already have a `BaseModel` you want to use:

```python
from prefect import flow
from prefect.flow_runs import pause_flow_run
from prefect.logging import get_run_logger
from pydantic import BaseModel


class User(BaseModel):
    name: str
    age: int


@flow
async def greet_user():
    logger = get_run_logger()

    user = await pause_flow_run(wait_for_input=User)

    logger.info(f"Hello, {user.name}!")
```

<Note>
**`BaseModel` classes are upgraded to `RunInput` classes automatically**

When you pass a `pydantic.BaseModel` class as the `wait_for_input` argument to `pause_flow_run` or `suspend_flow_run`, Prefect
automatically creates a `RunInput` class with the same behavior as your `BaseModel` and uses that instead.

`RunInput` classes contain extra logic that allows flows to send and receive them at runtime. You shouldn't notice any difference.
</Note>

For advanced use cases such as overriding how Prefect stores flow run inputs, create a `RunInput` class:

```python
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class UserInput(RunInput):
    name: str
    age: int

    # Imagine overridden methods here.
    def override_something(self, *args, **kwargs):
        super().override_something(*args, **kwargs)

@flow
async def greet_user():
    logger = get_run_logger()

    user = await pause_flow_run(wait_for_input=UserInput)

    logger.info(f"Hello, {user.name}!")
```

### Provide initial data

Set default values for fields in your model with the `with_initial_data` method. This is useful for providing default values
for the fields in your own `RunInput` class.

Expanding on the example above, you can make the `name` field default to "anonymous":

```python
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class UserInput(RunInput):
    name: str
    age: int

@flow
async def greet_user():
    logger = get_run_logger()

    user_input = await pause_flow_run(
        wait_for_input=UserInput.with_initial_data(name="anonymous")
    )

    if user_input.name == "anonymous":
        logger.info("Hello, stranger!")
    else:
        logger.info(f"Hello, {user_input.name}!")
```

When a user sees the form for this input, the name field contains "anonymous" as the default.

### Provide a description with runtime data

You can provide a dynamic, Markdown description that appears in the Prefect UI when the flow run pauses.
This feature enables context-specific prompts, enhancing clarity and user interaction. Building on the example above:

```python
from datetime import datetime
from prefect import flow
from prefect.flow_runs import pause_flow_run
from prefect.logging import get_run_logger
from prefect.input import RunInput


class UserInput(RunInput):
    name: str
    age: int


@flow
async def greet_user():
    logger = get_run_logger()
    current_date = datetime.now().strftime("%B %d, %Y")

    description_md = f"""
**Welcome to the User Greeting Flow!**
Today's Date: {current_date}

Please enter your details below:
- **Name**: What should we call you?
- **Age**: Just a number, nothing more.
"""

    user_input = await pause_flow_run(
        wait_for_input=UserInput.with_initial_data(
            description=description_md, name="anonymous"
        )
    )

    if user_input.name == "anonymous":
        logger.info("Hello, stranger!")
    else:
        logger.info(f"Hello, {user_input.name}!")
```

When a user sees the form for this input, the given Markdown appears above the input fields.

### Handle custom validation

Prefect uses the fields and type hints on your `RunInput` or `BaseModel` class to validate the general structure of input your flow receives.
If you require more complex validation, use Pydantic [model_validators](https://docs.pydantic.dev/latest/concepts/validators/#model-validators).

<Warning>
**Calling custom validation runs after the flow resumes**

Prefect transforms the type annotations in your `RunInput` or `BaseModel` class to a JSON schema and uses that schema in
the UI for client-side validation. However, custom validation requires running _Python_ logic defined in your `RunInput` class.
Because of this, validation happens _after the flow resumes_, so you should handle it explicitly in your flow.
Continue reading for an example best practice.
</Warning>

The following is an example `RunInput` class that uses a custom `model_validator`:

```python
from typing import Literal

import pydantic

from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.model_validator(mode="after")
    def validate_age(self):
        if self.color == "green" and self.size == "small":
            raise ValueError(
                "Green is only in-stock for medium, large, and XL sizes."
            )

        return self
```

In the example, we use Pydantic's `model_validator` decorator to define custom validation for our `ShirtOrder` class.
You can use it in a flow like this:

```python
from typing import Literal

import pydantic

from prefect import flow, pause_flow_run
from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.model_validator(mode="after")
    def validate_age(self):
        if self.color == "green" and self.size == "small":
            raise ValueError(
                "Green is only in-stock for medium, large, and XL sizes."
            )

        return self


@flow
def get_shirt_order():
    shirt_order = pause_flow_run(wait_for_input=ShirtOrder)
```

If a user chooses any size and color combination other than `small` and `green`, the flow run resumes successfully.
However, if the user chooses size `small` and color `green`, the flow run will resume, and `pause_flow_run` raises a
`ValidationError` exception. This causes the flow run to fail and log the error.

To avoid a flow run failure, use a `while` loop and pause again if the `ValidationError` exception is raised:

```python
from typing import Literal

import pydantic
from prefect import flow
from prefect.flow_runs import pause_flow_run
from prefect.logging import get_run_logger
from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.model_validator(mode="after")
    def validate_age(self):
        if self.color == "green" and self.size == "small":
            raise ValueError(
                "Green is only in-stock for medium, large, and XL sizes."
            )

        return self


@flow
def get_shirt_order():
    logger = get_run_logger()
    shirt_order = None

    while shirt_order is None:
        try:
            shirt_order = pause_flow_run(wait_for_input=ShirtOrder)
        except pydantic.ValidationError as exc:
            logger.error(f"Invalid size and color combination: {exc}")

    logger.info(
        f"Shirt order: {shirt_order.size}, {shirt_order.color}"
    )
```

This code causes the flow run to continually pause until the user enters a valid age.

As an additional step, you can use an [automation](/v3/automate/events/automations-triggers) to alert the user to the error.

## Send and receive input at runtime

Use the `send_input` and `receive_input` functions to send input to a flow or receive input from a flow at runtime.
You don't need to pause or suspend the flow to send or receive input.

<Note>
**Reasons to send or receive input without pausing or suspending**

You might want to send or receive input without pausing or suspending in scenarios where the flow run is designed to handle
real-time data. For example, in a live monitoring system, you might need to update certain parameters based on the incoming
data without interrupting the flow. Another example is having a long-running flow that continually responds to runtime input with
low latency. For example, if you're building a chatbot, you could have a flow that starts a GPT Assistant and manages a conversation thread.
</Note>

The most important parameter to the `send_input` and `receive_input` functions is `run_type`, which should be one of the following:

- A built-in type such as `int` or `str`
- A `pydantic.BaseModel` class
- A `prefect.input.RunInput` class

<Tip>
**When to use a `BaseModel` or `RunInput` instead of a built-in type**

Most built-in types and collections of built-in types should work with `send_input` and `receive_input`, but there is a caveat with
nested collection types, such as lists of tuples. For example, `List[Tuple[str, float]])`. In this case, validation may happen after
your flow receives the data, so calling `receive_input` may raise a `ValidationError`. You can plan to catch this exception, and
consider placing the field in an explicit `BaseModel` or `RunInput` so your flow only receives exact type matches.

See examples below of `receive_input`, `send_input`, and the two functions working together.
</Tip>

### Receiving input

The following flow uses `receive_input` to continually receive names and print a personalized greeting for each name it receives:

```python
from prefect import flow
from prefect.input.run_input import receive_input


@flow
async def greeter_flow():
    async for name_input in receive_input(str, timeout=None):
        # Prints "Hello, andrew!" if another flow sent "andrew"
        print(f"Hello, {name_input}!")
```

When you pass a type such as `str` into `receive_input`, Prefect creates a `RunInput` class to manage your
input automatically. When a flow sends input of this type, Prefect uses the `RunInput` class to validate the input.
If the validation succeeds, your flow receives the input in the type you specified. In this example, if the flow received a
valid string as input, the variable `name_input` contains the string value.

If, instead, you pass a `BaseModel`, Prefect upgrades your `BaseModel` to a `RunInput` class, and the variable your flow sees
(in this case, `name_input`), is a `RunInput` instance that behaves like a `BaseModel`. If you pass in a `RunInput` class,
no upgrade is needed and you'll get a `RunInput` instance.

A simpler approach is to pass types such as `str` into `receive_input` . If you need access to the generated
`RunInput` that contains the received value, pass `with_metadata=True` to `receive_input`:

```python
from prefect import flow
from prefect.input.run_input import receive_input


@flow
async def greeter_flow():
    async for name_input in receive_input(
        str,
        timeout=None,
        with_metadata=True
    ):
        # Input will always be in the field "value" on this object.
        print(f"Hello, {name_input.value}!")

```
<Tip>
**When to use `with_metadata=True`**

The primary uses of accessing the `RunInput` object for a receive input are to respond to the sender with the `RunInput.respond()`
function, or to access the unique key for an input.
</Tip>

Notice that the printing of `name_input.value`. When Prefect generates a `RunInput` for you from a built-in type,
the `RunInput` class has a single field, `value`, that uses a type annotation matching the type you specified.
So if you call `receive_input` like this: `receive_input(str, with_metadata=True)`, it's equivalent to manually
creating the following `RunInput` class and `receive_input` call:

```python
from prefect import flow
from prefect.input.run_input import RunInput

class GreeterInput(RunInput):
    value: str

@flow
async def greeter_flow():
    async for name_input in receive_input(GreeterInput, timeout=None):
        print(f"Hello, {name_input.value}!")
```
<Warning>
**The type used in `receive_input` and `send_input` must match**

For a flow to receive input, the sender must use the same type that the receiver is receiving.
This means that if the receiver is receiving `GreeterInput`, the sender must send `GreeterInput`. If the receiver is
receiving `GreeterInput` and the sender sends the `str` input that Prefect automatically upgrades to a `RunInput` class,
the types won't match; which means the receiving flow run won't receive the input. However, the input will wait for if the flow ever
calls `receive_input(str)`.
</Warning>

### Keep track of inputs you've already seen

By default, each time you call `receive_input`, you get an iterator that iterates over all known inputs to a specific flow run,
starting with the first received. The iterator keeps track of your current position as you iterate over it, or you can call `next()`
to explicitly get the next input. If you're using the iterator in a loop, you should assign it to a variable:

```python
from prefect import flow, get_client
from prefect.deployments import run_deployment
from prefect.input.run_input import receive_input, send_input

EXIT_SIGNAL = "__EXIT__"


@flow
async def sender():
    greeter_flow_run = await run_deployment(
        "greeter/send-receive", timeout=0, as_subflow=False
    )
    client = get_client()

    # Assigning the `receive_input` iterator to a variable
    # outside of the the `while True` loop allows us to continue
    # iterating over inputs in subsequent passes through the
    # while loop without losing our position.
    receiver = receive_input(
        str,
        with_metadata=True,
        timeout=None,
        poll_interval=0.1
    )

    while True:
        name = input("What is your name? ")
        if not name:
            continue

        if name == "q" or name == "quit":
            await send_input(
                EXIT_SIGNAL,
                flow_run_id=greeter_flow_run.id
            )
            print("Goodbye!")
            break

        await send_input(name, flow_run_id=greeter_flow_run.id)

        # Saving the iterator outside of the while loop and
        # calling next() on each iteration of the loop ensures
        # that we're always getting the newest greeting. If we
        # had instead called `receive_input` here, we would
        # always get the _first_ greeting this flow received,
        # print it, and then ask for a new name.
        greeting = await receiver.next()
        print(greeting)
```

An iterator helps keep track of the inputs your flow has already received. If you want your flow to suspend and then resume later,
save the keys of the inputs you've seen so the flow can read them back out when it resumes.
Consider using a [Variable](/v3/concepts/variables/).

The following flow receives input for 30 seconds then suspends itself, which exits the flow and tears down infrastructure:

```python
from prefect import flow
from prefect.logging import get_run_logger
from prefect.flow_runs import suspend_flow_run
from prefect.variables import Variable
from prefect.context import get_run_context
from prefect.input.run_input import receive_input


EXIT_SIGNAL = "__EXIT__"


@flow
async def greeter():
    logger = get_run_logger()
    run_context = get_run_context()
    assert run_context.flow_run, "Could not see my flow run ID"

    variable_name = f"{run_context.flow_run.id}-seen-ids"

    try:
        seen_keys = await Variable.get(variable_name)
    except (ValueError, TypeError):
        seen_keys = []

    try:
        async for name_input in receive_input(
            str,
            with_metadata=True,
            poll_interval=0.1,
            timeout=30,
            exclude_keys=seen_keys
        ):
            if name_input.value == EXIT_SIGNAL:
                print("Goodbye!")
                return
            await name_input.respond(f"Hello, {name_input.value}!")

            seen_keys.append(name_input.metadata.key)
            await Variable.set(
                variable_name,
                seen_keys,
                overwrite=True
            )
    except TimeoutError:
        logger.info("Suspending greeter after 30 seconds of idle time")
        await suspend_flow_run(timeout=10000)
```

As this flow processes name input, it adds the _key_ of the flow run input to the list of seen keys.
When the flow later suspends and then resumes, it reads the keys it has already seen from the variable and
passes them as the `exlude_keys` parameter to `receive_input`.

### Respond to the input's sender

When your flow receives input from another flow, Prefect knows the sending flow run ID, so the receiving flow can
respond by calling the `respond` method on the `RunInput` instance the flow received. There are a couple of requirements:

- Pass in a `BaseModel` or `RunInput`, or use `with_metadata=True`.
- The flow you are responding to must receive the same type of input you send to see it.

The `respond` method is equivalent to calling `send_input(..., flow_run_id=sending_flow_run.id)`. But with `respond`,
your flow doesn't need to know the sending flow run's ID.

Next, make the `greeter_flow` respond to name inputs instead of printing them:

```python
from prefect import flow
from prefect.input.run_input import receive_input


@flow
async def greeter():
    async for name_input in receive_input(
        str,
        with_metadata=True,
        timeout=None
    ):
        await name_input.respond(f"Hello, {name_input.value}!")
```

However, this flow runs forever unless there's a signal that it should exit.
Here's how to make it to look for a special string:

```python
from prefect import flow
from prefect.input.run_input import receive_input



EXIT_SIGNAL = "__EXIT__"


@flow
async def greeter():
    async for name_input in receive_input(
        str,
        with_metadata=True,
        poll_interval=0.1,
        timeout=None
    ):
        if name_input.value == EXIT_SIGNAL:
            print("Goodbye!")
            return
        await name_input.respond(f"Hello, {name_input.value}!")
```

With a `greeter` flow in place, create the flow that sends `greeter` names.

### Send input

Send input to a flow with the `send_input` function. This works similarly to `receive_input` and, like that function,
accepts the same `run_input` argument. This can be a built-in type such as `str`, or else a `BaseModel` or `RunInput` subclass.

<Note>
**When to send input to a flow run**

Send input to a flow run as soon as you have the flow run's ID. The flow does not have to be receiving input for you to send input.
If you send a flow input before it is receiving, it will see your input when it calls `receive_input`
(as long as the types in the `send_input` and `receive_input` calls match).
</Note>

Next, create a `sender` flow that starts a `greeter` flow run and then enters a loop—continuously
getting input from the terminal and sending it to the greeter flow:

```python
from prefect import flow
from prefect.deployments import run_deployment

@flow
async def sender():
    greeter_flow_run = await run_deployment(
        "greeter/send-receive", timeout=0, as_subflow=False
    )
    receiver = receive_input(str, timeout=None, poll_interval=0.1)
    client = get_client()

    while True:
        flow_run = await client.read_flow_run(greeter_flow_run.id)

        if not flow_run.state or not flow_run.state.is_running():
            continue

        name = input("What is your name? ")
        if not name:
            continue

        if name == "q" or name == "quit":
            await send_input(
                EXIT_SIGNAL,
                flow_run_id=greeter_flow_run.id
            )
            print("Goodbye!")
            break

        await send_input(name, flow_run_id=greeter_flow_run.id)
        greeting = await receiver.next()
        print(greeting)
```

First, `run_deployment` starts a `greeter` flow run. This requires a deployed flow running in a process.
That process begins running `greeter` while `sender` continues to execute. Calling `run_deployment(..., timeout=0)`
ensures that `sender` won't wait for the `greeter` flow run to complete, because it's running a loop and only exits when sending `EXIT_SIGNAL`.

Next, the iterator returned by `receive_input` as `receiver` is captured. This flow works by entering a loop. On each iteration of the loop,
the flow asks for terminal input, sends that to the `greeter` flow, and then runs `receiver.next()` to wait until it receives the response from `greeter`.

Next, the terminal user who ran this flow is allowed to exit by entering the string `q` or `quit`.
When that happens, the `greeter` flow is sent an exit signal to shut down, too.

Finally, the new name is sent to `greeter`. `greeter` sends back a greeting as a string.
When you receive the greeting, print it and continue the loop that gets terminal input.

### A complete example

For a complete example of using `send_input` and `receive_input`, here is what the `greeter` and `sender` flows look like together:

```python
import asyncio
import sys
from prefect import flow, get_client
from prefect.variables import Variable
from prefect.context import get_run_context
from prefect.deployments import run_deployment
from prefect.input.run_input import receive_input, send_input


EXIT_SIGNAL = "__EXIT__"


@flow
async def greeter():
    run_context = get_run_context()
    assert run_context.flow_run, "Could not see my flow run ID"

    variable_name = f"{run_context.flow_run.id}-seen-ids"

    try:
        seen_keys = await Variable.get(variable_name)
    except (ValueError, TypeError):
        seen_keys = []

    async for name_input in receive_input(
        str,
        with_metadata=True,
        poll_interval=0.1,
        timeout=None
    ):
        if name_input.value == EXIT_SIGNAL:
            print("Goodbye!")
            return
        await name_input.respond(f"Hello, {name_input.value}!")

        seen_keys.append(name_input.metadata.key)
        await Variable.set(
            variable_name,
            seen_keys,
            overwrite=True
        )


@flow
async def sender():
    greeter_flow_run = await run_deployment(
        "greeter/send-receive", timeout=0, as_subflow=False
    )
    receiver = receive_input(str, timeout=None, poll_interval=0.1)
    client = get_client()

    while True:
        flow_run = await client.read_flow_run(greeter_flow_run.id)

        if not flow_run.state or not flow_run.state.is_running():
            continue

        name = input("What is your name? ")
        if not name:
            continue

        if name == "q" or name == "quit":
            await send_input(
                EXIT_SIGNAL,
                flow_run_id=greeter_flow_run.id
            )
            print("Goodbye!")
            break

        await send_input(name, flow_run_id=greeter_flow_run.id)
        greeting = await receiver.next()
        print(greeting)


if __name__ == "__main__":
    if sys.argv[1] == "greeter":
        asyncio.run(greeter.serve(name="send-receive"))
    elif sys.argv[1] == "sender":
        asyncio.run(sender())
```

To run the example, you need a Python environment with Prefect installed, pointed at either a Prefect Cloud account or a self-hosted Prefect server instance.

With your environment set up, start a flow runner in one terminal with the following command:

```bash
python my_file_name greeter
```

For example, with Prefect Cloud, you should see output like this:

```bash
______________________________________________________________________
| Your flow 'greeter' is being served and polling for scheduled runs |
|                                                                    |
| To trigger a run for this flow, use the following command:         |
|                                                                    |
|         $ prefect deployment run 'greeter/send-receive'            |
|                                                                    |
| You can also run your flow via the Prefect UI:                     |
| https://app.prefect.cloud/account/...(a URL for your account)      |
|                                                                    |
______________________________________________________________________
```

Then start the greeter process in another terminal:

```bash
python my_file_name sender
```

You should see output like this:

```bash
11:38:41.800 | INFO    | prefect.engine - Created flow run 'gregarious-owl' for flow 'sender'
11:38:41.802 | INFO    | Flow run 'gregarious-owl' - View at https://app.prefect.cloud/account/...
What is your name?
```

Type a name and press the enter key to see a greeting to see sending and receiving in action:

```bash
What is your name? andrew
Hello, andrew!
```
